Drafting the business problem statement. A unique quality about Yellow cabs is that they can pick up passengers from the streets without any pre-arranged booking required. Whereas in the case of Uber, passengers mainly use the app to arrange for the taxi. So I found a business opportunity for yellow cabs to get back on track by targeting the streets where the pickups demand are high. If the passenger waits too much they switch to apps to look for a taxi but if the passengers can see a yellow cab right in front of them, they will surely choose the yellow cab instead of waiting for Uber. Hence this data science projects focus to help the NYC TLC yellow taxis to get back on track and increase the number of pickups around the NYC city. Also, the yellow cab taxi drivers can make use of the website to see the forecasted demand for different zones in NYC city.
Assess resource needs and availability: Resources Needed: Yellow Taxi pickups of past years and weather data of past year day by day was needed to make any projection in the future. Also, I need the taxi operating zone dataset of NYC city. Resources Availability: Yellow taxi dataset is a publicly available dataset. The attached datasets were provided and collected by the technology providers authorized under the Taxicab & Livery Passenger Enhancement Programs for the NYC Taxi and Limousine Commission (TLC).
Data Gathering is the first step of the machine learning life cycle. The goal of this step is to identify and obtain all data-related problems. In this step, we need to identify the different data sources, as data can be collected from various sources such as files, database, internet, or mobile devices. It is one of the most important steps of the life cycle. The quantity and quality of the collected data will determine the efficiency of the output. The more will be the data, the more accurate will be the prediction. This step includes the below tasks:
This public dataset was created by the National Oceanic and Atmospheric Administration (NOAA) and includes global data obtained from the USAF Climatology Center. This dataset covers GSOD data between 1929 and present (updated daily), collected from over 9000 stations.
It consists of an Shape file that is converted into a CSV file using an online converter and then processed using pandas. It consist of 263 zones in NYC city where the yellow taxi operates.
The NYC yellow taxi data set is avaiable openly on NYC government site.The data used in the attached datasets were collected and provided to the NYC Taxi and Limousine Commission (TLC) by technology providers authorized under the Taxicab & Livery Passenger Enhancement Programs.
The yellow taxi trip records include fields capturing pick-up and drop-off dates/times, pick-up and drop-off locations, trip distances, itemized fares, rate types, payment types, and driver-reported passenger counts. It is avaiable as s3 amazon data bucket which was initially difficult to read using a pandas dataframe. Hence the requirement of Pyspark was felt that eased the task of collecting the data from s3 bucket and use the concept of RDD to help process large set of data.
import pandas as pd
df1=pd.read_csv("https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-01.csv")
df2=pd.read_csv("https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-02.csv")
df3=pd.read_csv("https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-03.csv")
df4=pd.read_csv("https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-04.csv")
df5=pd.read_csv("https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-05.csv")
df6=pd.read_csv("https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-06.csv")
df7=pd.read_csv("https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-07.csv")
df8=pd.read_csv("https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-08.csv")
df9=pd.read_csv("https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-09.csv")
df10=pd.read_csv("https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-10.csv")
df11=pd.read_csv("https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-11.csv")
df12=pd.read_csv("https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-12.csv")
frames=[df1,df2,df3,df4,df5,df6,df7,df8,df9,df10,df11,df12]
result = pd.concat(frames)
It was giving Not enough memory error. I decided to go with Pyspark which uses the RDD architecture which consumes less RAM. Also I setup the environment for RDD and also upgraded my RAM TO 25 GB with a standalone cluster on Google Colab.
2018 Taxi Data Collection
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark
# 2. Setup Environment Variables
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"
import findspark
findspark.init()
from pyspark import SparkContext
from pyspark.sql import HiveContext, SQLContext, Row
from pyspark.sql.types import *
from datetime import datetime
from pyspark.sql.functions import col, date_sub, log, mean, to_date, udf, unix_timestamp
from pyspark.sql.window import Window
from pyspark.sql import DataFrame
sc =SparkContext()
sc.setLogLevel("DEBUG")
sqlContext = SQLContext(sc)
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
from pyspark import SparkFiles
url1 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-01.csv"
url2 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-02.csv"
url3 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-03.csv"
url4 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-04.csv"
url5 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-05.csv"
url6 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-06.csv"
url7 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-07.csv"
url8 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-08.csv"
url9 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-09.csv"
url10 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-10.csv"
url11 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-11.csv"
url12 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2018-12.csv"
spark.sparkContext.addFile(url1)
spark.sparkContext.addFile(url2)
spark.sparkContext.addFile(url3)
spark.sparkContext.addFile(url4)
spark.sparkContext.addFile(url5)
spark.sparkContext.addFile(url6)
spark.sparkContext.addFile(url7)
spark.sparkContext.addFile(url8)
spark.sparkContext.addFile(url9)
spark.sparkContext.addFile(url10)
spark.sparkContext.addFile(url11)
spark.sparkContext.addFile(url12)
df1 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2018-01.csv"), header=True, inferSchema= True)
df2 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2018-02.csv"), header=True, inferSchema= True)
df3 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2018-03.csv"), header=True, inferSchema= True)
df4 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2018-04.csv"), header=True, inferSchema= True)
df5 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2018-05.csv"), header=True, inferSchema= True)
df6 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2018-06.csv"), header=True, inferSchema= True)
df7 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2018-07.csv"), header=True, inferSchema= True)
df8 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2018-08.csv"), header=True, inferSchema= True)
df9 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2018-09.csv"), header=True, inferSchema= True)
df10 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2018-10.csv"), header=True, inferSchema= True)
df11 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2018-11.csv"), header=True, inferSchema= True)
df12 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2018-12.csv"), header=True, inferSchema= True)
df_lowerhalf = df1.union(df2)
df_lowerhalf = df_lowerhalf.union(df3)
df_lowerhalf = df_lowerhalf.union(df4)
df_lowerhalf = df_lowerhalf.union(df5)
df_lowerhalf = df_lowerhalf.union(df6)
df_upperhalf = d7.union(df8)
df_upperhalf = df_upperhalf.union(df9)
df_upperhalf = df_upperhalf.union(df10)
df_upperhalf = df_upperhalf.union(df11)
df_upperhalf = df_upperhalf.union(df12)
Copressing and storing in Parquet form to enable faster processing
from google.colab import drive
drive.mount('/content/drive')
print("Shape of 2018 First Half")
print((df_lowerhalf.count(), len(df_lowerhalf.columns)))
row2018_1=df_lowerhalf.count()
col2018_1=df_lowerhalf.columns
print("Shape of 2018 Second Half")
print((df_upperhalf.count(), len(df_upperhalf.columns)))
row2018_2=df_upperhalf.count()
col2018_2=df_upperhalf.columns
Note the below will take time. You can download .parquet from the links:
#Creating Parquet files
df_upperhalf.write.parquet("/content/drive/My Drive/Data Science/2018secondhalf.parquet")
df_lowerhalf.write.parquet("/content/drive/My Drive/Data Science/2018firsthalf.parquet")
Now Parquet files can be read into Pandas Dataframe. I did run this command on 25 Gb colab cluster. Please note it is important to specify the file path to parquet file correctly.
import pandas as pd
y2018_part1=pd.read_parquet('/content/drive/My Drive/Data Science/2018firsthalf.parquet')
y2018_part1.shape
!pip install datalab
from datalab.context import Context
For running this steps you require Google Cloud Credentials with project ID. Note : I am not sharing my credentials as it is associated with my account. But you can run this step after setting up the credentials as shown in :
import os
os.environ["GOOGLE_APPLICATION_CREDENTIALS"]="/content/drive/My Drive/Data Science/nyc-taxi-265120-baf9a3e4cf9b.json"
%reload_ext google.cloud.bigquery
#Storing to Google Cloud Platform
project_id = 'nyc-taxi-265120' #@param{type:"string"}
y2018_part1.to_gbq('NYC.2018firsthalf',
Context.default().project_id,
chunksize=500000,
if_exists='append',
verbose=False
)
y2018_part2=pd.read_parquet('/content/drive/My Drive/Data Science/2018secondhalf.parquet')
#Storing to Google Cloud Platform
project_id = 'nyc-taxi-265120' #@param{type:"string"}
y2018_part2.to_gbq('NYC.2018SecondHalf',
Context.default().project_id,
chunksize=500000,
if_exists='append',
verbose=False
)
2019 Taxi Data Collection
url1 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-01.csv"
url2 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-02.csv"
url3 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-03.csv"
url4 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-04.csv"
url5 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-05.csv"
url6 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-06.csv"
url7 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-07.csv"
url8 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-08.csv"
url9 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-09.csv"
url10 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-10.csv"
url11 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-11.csv"
url12 = "https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2019-12.csv"
spark.sparkContext.addFile(url1)
spark.sparkContext.addFile(url2)
spark.sparkContext.addFile(url3)
spark.sparkContext.addFile(url4)
spark.sparkContext.addFile(url5)
spark.sparkContext.addFile(url6)
spark.sparkContext.addFile(url7)
spark.sparkContext.addFile(url8)
spark.sparkContext.addFile(url9)
spark.sparkContext.addFile(url10)
spark.sparkContext.addFile(url11)
spark.sparkContext.addFile(url12)
df21 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2019-01.csv"), header=True, inferSchema= True)
df22 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2019-02.csv"), header=True, inferSchema= True)
df23 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2019-03.csv"), header=True, inferSchema= True)
df24 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2019-04.csv"), header=True, inferSchema= True)
df25 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2019-05.csv"), header=True, inferSchema= True)
df26 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2019-06.csv"), header=True, inferSchema= True)
df27 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2019-07.csv"), header=True, inferSchema= True)
df28 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2019-08.csv"), header=True, inferSchema= True)
df29 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2019-09.csv"), header=True, inferSchema= True)
df210 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2019-10.csv"), header=True, inferSchema= True)
df211 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2019-11.csv"), header=True, inferSchema= True)
df212 = spark.read.csv("file://"+SparkFiles.get("yellow_tripdata_2019-12.csv"), header=True, inferSchema= True)
df_lowerhalf_2019 = df21.union(df22)
df_lowerhalf_2019 = df_lowerhalf_2020.union(df23)
df_lowerhalf_2019 = df_lowerhalf_2020.union(df24)
df_lowerhalf_2019 = df_lowerhalf_2020.union(df25)
df_lowerhalf_2019 = df_lowerhalf_2020.union(df26)
df_upperhalf_2019 = df27.union(df28)
df_upperhalf_2019 = df_upperhalf_2020.union(df29)
df_upperhalf_2019 = df_upperhalf_2020.union(df210)
df_upperhalf_2019 = df_upperhalf_2020.union(df211)
df_upperhalf_2019 = df_upperhalf_2020.union(df212)
print("Shape of 2019 First Half")
print((df_lowerhalf_2019.count(), len(df_lowerhalf_2019.columns)))
row2019_1=df_lowerhalf_2019.count()
col2019_1=df_lowerhalf_2019.columns
print("Shape of 2019 Second Half")
print((df_lowerhalf.count(), len(df_lowerhalf.columns)))
row2019_2=df_upperhalf_2019.count()
col2019_2=df_upperhalf_2019.columns
df_upperhalf_2020.write.parquet("/content/drive/My Drive/Data Science/2019upper.parquet")
df_lowerhalf_2020.write.parquet("/content/drive/My Drive/Data Science/2019lower.parquet")
import pandas as pd
y2019upper=pd.read_parquet('/content/drive/My Drive/Data Science/2019upper.parquet')
y2019lower=pd.read_parquet('/content/drive/My Drive/Data Science/2019lower.parquet')
#Storing to Google Cloud Platform
project_id = 'nyc-taxi-265120' #@param{type:"string"}
y2019lower.to_gbq('NYC.2019firsthalf',
Context.default().project_id,
chunksize=500000,
if_exists='append',
verbose=False
)
#Storing to Google Cloud Platform
project_id = 'nyc-taxi-265120' #@param{type:"string"}
y2019upper.to_gbq('NYC.2019secondhalf',
Context.default().project_id,
chunksize=500000,
if_exists='append',
verbose=False
)
Weather Data
project_id = 'nyc-taxi-265120'
from google.cloud import bigquery
client = bigquery.Client(project = project_id)
from google.cloud import bigquery
client = bigquery.Client(project=project_id)
sample_count = 2000
row_count = client.query('''
SELECT
COUNT(*) as total
FROM `bigquery-public-data.samples.gsod`''').to_dataframe().total[0]
df = client.query('''
SELECT
*
FROM
`bigquery-public-data.samples.gsod`
WHERE RAND() < %d/%d
''' % (sample_count, row_count)).to_dataframe()
print('Full dataset has %d rows' % row_count)
df.head()
Taxi Zone Data
SELECT COUNT(*) FROM `bigquery-public-data.new_york_taxi_trips.taxi_zone_geom` LIMIT 1000
df = client.query('''SELECT * FROM `bigquery-public-data.new_york_taxi_trips.taxi_zone_geom` ''' ).to_dataframe()
df.head()
import pandas as pd
import numpy as np
import sys
from google.cloud import bigquery
from google.colab import auth
auth.authenticate_user()
import matplotlib.pyplot as plt
fig = plt.figure()
ax = fig.add_axes([0,0,1,1])
dataset = ['2018 1st half', '2018 2nd half', '2019 first half', '2019 second half']
rows = [row2018_1,row2018_2,row2019_1,row2019_2]
plt.xlabel('Columns')
plt.ylabel('Rows')
ax.bar(dataset,rows)
plt.show()
import matplotlib.pyplot as plt
fig = plt.figure()
ax = fig.add_axes([0,0,1,1])
dataset = ['2018 1st half', '2018 2nd half', '2019 first half', '2019 second half']
columns = [col2018_1,col2018_1,col2018_1,col2018_1]
plt.xlabel('Columns')
plt.ylabel('Dataset')
ax.bar(dataset,columns)
plt.show()
After using Pyspark and pandas togbq function I uploaded my entire dataset on Google Cloud Platform.The screenshot below depicts the entire Dataset information that I used in my Project:
Understanding Discovery: Discovery is an information-gathering process meant to dig deep into the details of what is important to a client's business, target audience, and industry. The more information you gather, interpret, and comprehend, the more prepared you will be to execute a site on budget and on target. This step was essential to determine the following :